Add Exchange before GroupId to improve Partial Aggregation#11741
Add Exchange before GroupId to improve Partial Aggregation#11741findepi wants to merge 2 commits intoprestodb:masterfrom findepi:findepi/master/add-exchange-before-groupid-to-improve-partial-aggregation-16b5ea
Conversation
The idea was abandoned during #11267 review.
The rule brings significant improvement in TPC-DS Q22 and Q67 while not causing much regression in other TPC-H, TPC-DS queries. (The only observably regressing queries were still much better than non-CBO baseline.)
kokosing
left a comment
There was a problem hiding this comment.
LGTM because it already went through the internal review process.
Some nit comments.
| * GroupId (before multiplication) makes partial aggregation more effective, resulting in less data being | ||
| * exchanged afterwards. | ||
| */ | ||
| public class AddExchangesBelowPartialAggregationOverGroupIdRuleSet |
There was a problem hiding this comment.
It would be nice to have some unit tests, but that would require a lot of plumbing.
| return transform(aggregation, groupId, context) | ||
| .map(newAggregation -> { | ||
| PlanNode newExchange = exchange.replaceChildren(ImmutableList.of(newAggregation)); | ||
| return Result.ofPlanNode(newExchange); |
|
|
||
| List<Symbol> desiredHashSymbols = groupingSetHistogram.entrySet().stream() | ||
| // Take only frequently used symbols | ||
| .filter(entry -> entry.getCount() >= groupId.getGroupingSets().size() * GROUPING_SETS_SYMBOL_REQUIRED_FREQUENCY) |
There was a problem hiding this comment.
what if all symbols are less frequent, shouldn't we return Optional.empty() here (short circuit)?
|
|
||
| StreamPreferredProperties requiredProperties = fixedParallelism().withPartitioning(desiredHashSymbols); | ||
| StreamProperties sourceProperties = derivePropertiesRecursively(groupId.getSource(), context); | ||
| if (requiredProperties.isSatisfiedBy(sourceProperties)) { |
There was a problem hiding this comment.
it would be nice to extract this as a separate rule that would remove unnecessary partial aggregations
arhimondr
left a comment
There was a problem hiding this comment.
Could you please provide some high level overview for the problem you are trying to solve?
| public static final String LEGACY_ROW_FIELD_ORDINAL_ACCESS = "legacy_row_field_ordinal_access"; | ||
| public static final String ITERATIVE_OPTIMIZER = "iterative_optimizer_enabled"; | ||
| public static final String ITERATIVE_OPTIMIZER_TIMEOUT = "iterative_optimizer_timeout"; | ||
| public static final String ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID = "enable_forced_exchange_below_group_id"; |
There was a problem hiding this comment.
Maybe add_exchange_before_group_id
| private double spillMaxUsedSpaceThreshold = 0.9; | ||
| private boolean iterativeOptimizerEnabled = true; | ||
| private boolean enableStatsCalculator = true; | ||
| private boolean enableForcedExchangeBelowGroupId = true; |
There was a problem hiding this comment.
Can we have it disabled by default?
| new PushPartialAggregationThroughJoin(), | ||
| new PushPartialAggregationThroughExchange(metadata.getFunctionRegistry()), | ||
| new PruneJoinColumns()))); | ||
| builder.add(new IterativeOptimizer( |
There was a problem hiding this comment.
This rule uses cost model. Cost model loses estimates when there are any partial aggregation in between.
| * GroupId (before multiplication) makes partial aggregation more effective, resulting in less data being | ||
| * exchanged afterwards. | ||
| */ | ||
| public class AddExchangesBelowPartialAggregationOverGroupIdRuleSet |
There was a problem hiding this comment.
Maybe just AddExchangesBeforeGroupId?
The rule brings significant improvement in TPC-DS Q22 and Q67 while not
causing much regression in other TPC-H, TPC-DS queries. (The only
observably regressing queries were still much better than non-CBO
baseline.)